Sentinel源码(九)集成SpringCloudGateway 您所在的位置:网站首页 sentinel 源码 Sentinel源码(九)集成SpringCloudGateway

Sentinel源码(九)集成SpringCloudGateway

2023-10-28 00:54| 来源: 网络整理| 查看: 265

案例

下面是官方提供的配置案例。

@Configuration public class GatewayConfiguration { private final List viewResolvers; private final ServerCodecConfigurer serverCodecConfigurer; public GatewayConfiguration(ObjectProvider viewResolversProvider, ServerCodecConfigurer serverCodecConfigurer) { this.viewResolvers = viewResolversProvider.getIfAvailable(Collections::emptyList); this.serverCodecConfigurer = serverCodecConfigurer; } @Bean @Order(Ordered.HIGHEST_PRECEDENCE) public SentinelGatewayBlockExceptionHandler sentinelGatewayBlockExceptionHandler() { return new SentinelGatewayBlockExceptionHandler(viewResolvers, serverCodecConfigurer); } @Bean @Order(-1) public GlobalFilter sentinelGatewayFilter() { return new SentinelGatewayFilter(); } @PostConstruct public void doInit() { // 加载自定义API分组 initCustomizedApis(); // 加载网关流控规则 initGatewayRules(); } }

简单介绍一下几个组件的作用:

SentinelGatewayBlockExceptionHandler:用于处理BlockException; SentinelGatewayFilter:适配SpringCloudGateway的GlobalFilter实现类; initCustomizedApis:加载自定义API分组; initGatewayRules:加载网关流控规则; 1、确定资源

默认情况下,Sentinel会使用Gateway的routeId路由id作为资源名称。

比如下面配置了两个路由,分别对应两个资源,资源名称即为route.id(aliyun_route和httpbin_route)。

spring: cloud: gateway: enabled: true routes: - id: aliyun_route uri: https://www.aliyun.com/ predicates: - Path=/product/** - id: httpbin_route uri: https://httpbin.org predicates: - Path=/httpbin/** filters: - RewritePath=/httpbin/(?.*), /${segment}

但是路由维度的流控规则很难满足需求。所以Sentinel在与Gateway集成时提出了API分组的概念。

比如通过Sentinel Dashboard的API管理中,可以新增API分组。

控制台-API分组.png

API分组也可以通过编码方式配置,如官方案例GatewayConfiguration的initCustomizedApis方法。

// GatewayConfiguration.java @PostConstruct public void doInit() { // 加载自定义API分组 initCustomizedApis(); // 加载网关流控规则 initGatewayRules(); } private void initCustomizedApis() { Set definitions = new HashSet(); ApiDefinition api1 = new ApiDefinition("some_customized_api") .setPredicateItems(new HashSet() {{ add(new ApiPathPredicateItem().setPattern("/ahas")); add(new ApiPathPredicateItem().setPattern("/product/**") .setMatchStrategy(SentinelGatewayConstants.URL_MATCH_STRATEGY_PREFIX)); }}); ApiDefinition api2 = new ApiDefinition("another_customized_api") .setPredicateItems(new HashSet() {{ add(new ApiPathPredicateItem().setPattern("/**") .setMatchStrategy(SentinelGatewayConstants.URL_MATCH_STRATEGY_PREFIX)); }}); definitions.add(api1); definitions.add(api2); GatewayApiDefinitionManager.loadApiDefinitions(definitions); }

ApiPathPredicateItem匹配模式。

public class ApiPathPredicateItem implements ApiPredicateItem { // 模式字符串 private String pattern; // 匹配策略 0-精确 1-前缀 2-正则 private int matchStrategy = SentinelGatewayConstants.URL_MATCH_STRATEGY_EXACT; }

ApiDefinition对应API分组。其中apiName会作为资源名称使用。

public class ApiDefinition { // 自定义api分组名称 private String apiName; // 匹配模式 private Set predicateItems; } 2、网关流控规则

Sentinel为Gateway专门设计了网关流控规则。

控制台-网关流控规则.png

Dashboard对应GatewayFlowRule。

public class GatewayFlowRule { // 资源名称 private String resource; // 资源类型(API类型)0-routeId 1-API分组 private int resourceMode = SentinelGatewayConstants.RESOURCE_MODE_ROUTE_ID; // 阈值类型 0-线程数 1-QPS private int grade = RuleConstant.FLOW_GRADE_QPS; // 阈值 private double count; // 间隔(时间窗口)(秒) private long intervalSec = 1; // 流控方式(流控效果)0-快速失败(令牌桶) 2-匀速排队(漏桶) private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT; // 应对突发请求时额外允许的请求数目 private int burst; // 在流控方式为匀速排队的情况下,排队最大时长 private int maxQueueingTimeoutMs = 500; // 针对请求属性 如ip、host、header private GatewayParamFlowItem paramItem; }

每个Gateway的流控规则,支持配置一个GatewayParamFlowItem,支持从ip、host、header中提取参数,作为方法入参,即SphU.entry方法的args参数。后文称配置了GatewayParamFlowItem的网关流控规则为:有参网关流控规则,反之为无参网关流控规则。

public class GatewayParamFlowItem { // 热点规则参数下标 --- 通过计算得到 private Integer index; // 解析策略 如 ip or header private int parseStrategy; // 参数名称 private String fieldName; // 匹配参数值模式 private String pattern; // 匹配策略 private int matchStrategy = SentinelGatewayConstants.PARAM_MATCH_STRATEGY_EXACT; } 3、管理网关流控规则

由于Sentinel区分了普通流控规则和网关流控规则,所以网关流控规则需要有单独的Manager管理。

GatewayRuleManager负责管理网关流控规则。

不仅如此,它还将GatewayFlowRule转换为了ParamFlowRule,这意味着网关热点参数规则复用了普通热点参数规则的逻辑。(普通热点参数规则是不支持普通流控规则的,后面看他怎么做二次适配)

public final class GatewayRuleManager { // 原始网关流控规则 private static final Map GATEWAY_RULE_MAP = new ConcurrentHashMap(); // 转换后 热点参数规则 private static final Map CONVERTED_PARAM_RULE_MAP = new ConcurrentHashMap(); }

GatewayRuleManager.GatewayRulePropertyListener的applyGatewayRuleInternal方法,将网关流控规则,转换为热点参数规则。主要逻辑分为有参网关流控规则和无参网关流控规则。

// GatewayRuleManager.GatewayRulePropertyListener.java private synchronized void applyGatewayRuleInternal(Set conf) { if (conf == null || conf.isEmpty()) { applyToConvertedParamMap(new HashSet()); GATEWAY_RULE_MAP.clear(); return; } // 资源 - 网关流控规则集合 Map gatewayRuleMap = new ConcurrentHashMap(); // 资源 - 参数下标游标 对于有参网关流控规则才有用 Map idxMap = new HashMap(); // 网关流控规则 转换为 热点参数规则 Set paramFlowRules = new HashSet(); // 资源 - 无参网关流控规则 Map noParamMap = new HashMap(); for (GatewayFlowRule rule : conf) { String resourceName = rule.getResource(); if (rule.getParamItem() == null) { // ... 统计noParamMap } else { // 转换有参网关流控规则 int idx = getIdxInternal(idxMap, resourceName); if (paramFlowRules.add(GatewayRuleConverter.applyToParamRule(rule, idx))) { idxMap.put(rule.getResource(), idx + 1); } cacheRegexPattern(rule.getParamItem()); } Set ruleSet = gatewayRuleMap.get(resourceName); if (ruleSet == null) { ruleSet = new HashSet(); gatewayRuleMap.put(resourceName, ruleSet); } ruleSet.add(rule); } for (Map.Entry e : noParamMap.entrySet()) { List rules = e.getValue(); if (rules == null || rules.isEmpty()) { continue; } for (GatewayFlowRule rule : rules) { int idx = getIdxInternal(idxMap, e.getKey()); // 转换无参网关流控规则 paramFlowRules.add(GatewayRuleConverter.applyNonParamToParamRule(rule, idx)); } } // 保存热点参数规则 到CONVERTED_PARAM_RULE_MAP applyToConvertedParamMap(paramFlowRules); // 保存原始网关流控规则 到GATEWAY_RULE_MAP GATEWAY_RULE_MAP.clear(); GATEWAY_RULE_MAP.putAll(gatewayRuleMap); } private void applyToConvertedParamMap(Set paramFlowRules) { Map newRuleMap = ParamFlowRuleUtil.buildParamRuleMap( new ArrayList(paramFlowRules)); // 如果没有规则,清空热点流控规则 if (newRuleMap == null || newRuleMap.isEmpty()) { for (String resource : CONVERTED_PARAM_RULE_MAP.keySet()) { ParameterMetricStorage.clearParamMetricForResource(resource); } CONVERTED_PARAM_RULE_MAP.clear(); return; } // 清理老的热点流控规则Metric for (Map.Entry entry : CONVERTED_PARAM_RULE_MAP.entrySet()) { // ... } // 保存热点流控规则 CONVERTED_PARAM_RULE_MAP.clear(); CONVERTED_PARAM_RULE_MAP.putAll(newRuleMap); }

重点关注两个转换方法。

GatewayRuleConverter.applyToParamRule,将有参网关流控规则,转换为热点参数规则。

除了GatewayFlowRule拷贝属性到ParamFlowRule以外,对于GatewayParamFlowItem有配置pattern的情况,设置了热点规则的参数例外项。

generateNonMatchPassParamItem方法生成参数例外项。参数类型是String,阈值是100万,参数值匹配$NM字符串,理论上不会匹配任何入参。为什么这么做,也是为了适配原来的热点流控规则(方法没有入参,不会走热点流控规则校验)。

// GatewayRuleConverter.java static ParamFlowRule applyToParamRule(GatewayFlowRule gatewayRule, int idx) { ParamFlowRule paramRule = new ParamFlowRule(gatewayRule.getResource()) .setCount(gatewayRule.getCount()) .setGrade(gatewayRule.getGrade()) .setDurationInSec(gatewayRule.getIntervalSec()) .setBurstCount(gatewayRule.getBurst()) .setControlBehavior(gatewayRule.getControlBehavior()) .setMaxQueueingTimeMs(gatewayRule.getMaxQueueingTimeoutMs()) .setParamIdx(idx); GatewayParamFlowItem gatewayItem = gatewayRule.getParamItem(); // 设置index下标,用于后续热点规则校验,同一个资源的不同网关流控规则,index自增 gatewayItem.setIndex(idx); // 针对有GatewayParamFlowItem.pattern的情况,需要给热点参数规则加例外项 String valuePattern = gatewayItem.getPattern(); if (valuePattern != null) { paramRule.getParamFlowItemList().add(generateNonMatchPassParamItem()); } return paramRule; } static ParamFlowItem generateNonMatchPassParamItem() { return new ParamFlowItem().setClassType(String.class.getName()) .setCount(1000_0000) .setObject(SentinelGatewayConstants.GATEWAY_NOT_MATCH_PARAM); } // SentinelGatewayConstants.java public final class SentinelGatewayConstants { public static final String GATEWAY_NOT_MATCH_PARAM = "$NM"; }

GatewayRuleConverter.applyNonParamToParamRule,将无参网关流控规则,转换为热点参数规则,仅仅是拷贝GatewayFlowRule属性到ParamFlowRule上。无参网关流控规则最主要特点是,转换后热点规则的idx参数下标始终保持不变,是参数列表的最后一个元素。

// GatewayRuleConverter.java static ParamFlowRule applyNonParamToParamRule(GatewayFlowRule gatewayRule, int idx) { return new ParamFlowRule(gatewayRule.getResource()) .setCount(gatewayRule.getCount()) .setGrade(gatewayRule.getGrade()) .setDurationInSec(gatewayRule.getIntervalSec()) .setBurstCount(gatewayRule.getBurst()) .setControlBehavior(gatewayRule.getControlBehavior()) .setMaxQueueingTimeMs(gatewayRule.getMaxQueueingTimeoutMs()) .setParamIdx(idx); }

以上GatewayFlowRule转换为ParamFlowRule仅仅是网关流控适配热点参数规则的第一步。

总结来说如下图:

网关流控规则-热点参数规则.png

无参网关流控规则:热点规则参数下标等于有参规则数量,例外项为空,利用热点参数规则实现了普通流控规则;

有参网关流控规则:热点规则参数下标自增

无pattern的情况下,代表不需要做模式匹配,例外项为空,意思是针对args[x]不同参数分别做流量控制; 有pattern的情况下,代表需要做模式匹配,例外项为$NM,阈值100万,意思是仅针对匹配上的args[x]做流量控制,其他没匹配上的直接放行(走例外项阈值100万),仅做流量统计; 4、GlobalFilter

Sentinel实现了自己的全局拦截器SentinelGatewayFilter,在SpringCloudGateway中接入网关流控规则逻辑。

public class SentinelGatewayFilter implements GatewayFilter, GlobalFilter, Ordered { // 参数解析器 private final GatewayParamParser paramParser = new GatewayParamParser( new ServerWebExchangeItemParser()); @Override public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) { Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR); Mono asyncResult = chain.filter(exchange); // 路由id作为资源名称 if (route != null) { String routeId = route.getId(); // 解析入参 Object[] params = paramParser.parseParameterFor(routeId, exchange, r -> r.getResourceMode() == SentinelGatewayConstants.RESOURCE_MODE_ROUTE_ID); String origin = Optional.ofNullable(GatewayCallbackManager.getRequestOriginParser()) .map(f -> f.apply(exchange)) .orElse(""); asyncResult = asyncResult.transform( new SentinelReactorTransformer(new EntryConfig(routeId, ResourceTypeConstants.COMMON_API_GATEWAY, EntryType.IN, 1, params, new ContextConfig(contextName(routeId), origin))) ); } // 自定义API分组作为资源名称 Set matchingApis = pickMatchingApiDefinitions(exchange); for (String apiName : matchingApis) { // 解析入参 Object[] params = paramParser.parseParameterFor(apiName, exchange, r -> r.getResourceMode() == SentinelGatewayConstants.RESOURCE_MODE_CUSTOM_API_NAME); asyncResult = asyncResult.transform( new SentinelReactorTransformer(new EntryConfig(apiName, ResourceTypeConstants.COMMON_API_GATEWAY, EntryType.IN, 1, params)) ); } return asyncResult; } }

SentinelGatewayFilter的filter方法:

根据请求,选择资源,routeId是肯定存在的,另外自定义API分组可能存在; 使用GatewayParamParser解析器,解析params参数,作为后期SphU.entry的args入参; SentinelReactorTransformer负责实际拦截逻辑; 4-1、GatewayParamParser

GatewayParamParser解析器,是适配热点参数规则的第二步。

public class GatewayParamParser { private final RequestItemParser requestItemParser; public Object[] parseParameterFor(String resource, T request, Predicate rulePredicate) { if (StringUtil.isEmpty(resource) || request == null || rulePredicate == null) { return new Object[0]; } // 有参网关流控规则 Set gatewayRules = new HashSet(); Set predSet = new HashSet(); boolean hasNonParamRule = false; for (GatewayFlowRule rule : GatewayRuleManager.getRulesForResource(resource)) { if (rule.getParamItem() != null) { gatewayRules.add(rule); predSet.add(rulePredicate.test(rule)); } else { hasNonParamRule = true; } } if (!hasNonParamRule && gatewayRules.isEmpty()) { return new Object[0]; } if (predSet.size() > 1 || predSet.contains(false)) { return new Object[0]; } // 数组大小 int size = hasNonParamRule ? gatewayRules.size() + 1 : gatewayRules.size(); Object[] arr = new Object[size]; for (GatewayFlowRule rule : gatewayRules) { GatewayParamFlowItem paramItem = rule.getParamItem(); int idx = paramItem.getIndex(); // 从请求中匹配参数。如果pattern非空且匹配不中,返回$NM,命中例外项,不限制阈值; // 否则使用资源统一阈值 String param = parseInternal(paramItem, request); arr[idx] = param; } if (hasNonParamRule) { // $D arr[size - 1] = SentinelGatewayConstants.GATEWAY_DEFAULT_PARAM; } return arr; } private String parseInternal(GatewayParamFlowItem item, T request) { switch (item.getParseStrategy()) { case SentinelGatewayConstants.PARAM_PARSE_STRATEGY_CLIENT_IP: // ip return parseClientIp(item, request); case SentinelGatewayConstants.PARAM_PARSE_STRATEGY_HOST: // host return parseHost(item, request); case SentinelGatewayConstants.PARAM_PARSE_STRATEGY_HEADER: // header return parseHeader(item, request); case SentinelGatewayConstants.PARAM_PARSE_STRATEGY_URL_PARAM: // urlParam return parseUrlParameter(item, request); case SentinelGatewayConstants.PARAM_PARSE_STRATEGY_COOKIE: // cookie return parseCookie(item, request); default: return null; } } }

parseParameterFor方法,关注几个点:

入参数组大小:如果资源没有无参网关流控规则,数组大小=有参网关流控规则数量;反之,数组大小=无参网关流控规则数量+1; 无参网关流控规则存在的情况下,入参数组最后一位,用GATEWAY_DEFAULT_PARAM($D)填充。这一步是为了让最后一个参数存在,能进入热点参数规则,进行资源的所有无参数例外项的热点规则流控校验; 有参网关流控规则,用parseInternal方法解析规则参数,填充入参数组,数组下标取自于GatewayRuleConverter.applyToParamRule。parseInternal支持5种策略:ip、host、header、urlParam、cookie;

以GatewayParamFlowItem匹配ip为例,会进入parseClientIp方法。

如果pattern为空,表示当前网关流控规则没针对ip,只是取ip作为args入参;

如果pattern不为空,表示当前网关流控规则,针对某类ip生效,进入parseWithMatchStrategyInternal方法继续解析。

// GatewayParamParser.java private String parseClientIp(GatewayParamFlowItem item, T request) { String clientIp = requestItemParser.getRemoteAddress(request); String pattern = item.getPattern(); if (StringUtil.isEmpty(pattern)) { return clientIp; } return parseWithMatchStrategyInternal(item.getMatchStrategy(), clientIp, pattern); }

parseWithMatchStrategyInternal根据matchStrategy匹配策略、pattern模式匹配字符串(ip字符串需要满足的模式)匹配value原始字符串(ip)。

目前支持精确、包含、正则三种匹配策略,如果匹配成功,返回原始字符串,否则返回GATEWAY_NOT_MATCH_PARAM($NM)。

这与前面GatewayRuleConverter.applyToParamRule转换有参网关流控规则相呼应,代表ip如果不匹配,热点规则校验时流控阈值为100万,近似于直接放行。

// GatewayParamParser.java private String parseWithMatchStrategyInternal(int matchStrategy, String value, String pattern) { if (value == null) { return null; } switch (matchStrategy) { case SentinelGatewayConstants.PARAM_MATCH_STRATEGY_EXACT: return value.equals(pattern) ? value : SentinelGatewayConstants.GATEWAY_NOT_MATCH_PARAM; case SentinelGatewayConstants.PARAM_MATCH_STRATEGY_CONTAINS: return value.contains(pattern) ? value : SentinelGatewayConstants.GATEWAY_NOT_MATCH_PARAM; case SentinelGatewayConstants.PARAM_MATCH_STRATEGY_REGEX: Pattern regex = GatewayRegexCache.getRegexPattern(pattern); if (regex == null) { return value; } return regex.matcher(value).matches() ? value : SentinelGatewayConstants.GATEWAY_NOT_MATCH_PARAM; default: return value; } }

至此可以理解到网关流控规则和热点参数规则之间的关系:

无参网关流控规则,使用$D作为args中的最后一个入参,这是为了能够进入热点规则校验逻辑(如果参数为空,热点规则会直接放行,见ParamFlowChecker#passCheck)。对应热点参数规则没参数例外项,所以会用配置的阈值作为热点规则校验的阈值; 有参网关流控规则,在参数模式匹配成功的情况下,用原始字符串作为args入参,他不会匹配热点参数规则的参数例外项,所以也会使用配置的阈值作为热点规则校验的阈值; 有参网关流控规则,在参数模式匹配失败的情况下,用$NM作为args中的入参,它会匹配热点参数规则的参数例外项,但是例外项的阈值是100万,等同于直接放行; 4-2、SentinelReactorTransformer

SentinelReactorTransformer转换器,在sentinel-reactor-adapter模块中,用于适配基于reactor模式开发的框架,例如SpringWebFlux、SpringCloudGateway。每次请求,都会new一个SentinelReactorTransformer。

public class SentinelReactorTransformer implements Function { private final EntryConfig entryConfig; public SentinelReactorTransformer(String resourceName) { this(new EntryConfig(resourceName)); } public SentinelReactorTransformer(EntryConfig entryConfig) { AssertUtil.notNull(entryConfig, "entryConfig cannot be null"); this.entryConfig = entryConfig; } @Override public Publisher apply(Publisher publisher) { if (publisher instanceof Mono) { return new MonoSentinelOperator((Mono) publisher, entryConfig); } if (publisher instanceof Flux) { return new FluxSentinelOperator((Flux) publisher, entryConfig); } throw new IllegalStateException("Publisher type is not supported: " + publisher.getClass().getCanonicalName()); } }

SentinelReactorTransformer构造时传入了EntryConfig,里面存储了所有执行sentinel api需要的信息。

public class EntryConfig { // 资源名称 private final String resourceName; // 入口流量 or 出口流量 private final EntryType entryType; // 资源类型 route.id or api分组 private final int resourceType; // 获取数量 private final int acquireCount; // 入参 private final Object[] args; // 上下文配置 private final ContextConfig contextConfig; } public class ContextConfig { // 上下文 private final String contextName; // 来源 private final String origin; }

SentinelReactorTransformer的apply方法,将原有publisher做了一次包装。例如MonoSentinelOperator。

public class MonoSentinelOperator extends MonoOperator { private final EntryConfig entryConfig; @Override public void subscribe(CoreSubscriber


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

      专题文章
        CopyRight 2018-2019 实验室设备网 版权所有